home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyo (Python 2.5)
-
- from weakref import WeakValueDictionary
- import cb
- from M2Crypto import util, BIO, Err, RSA, m2, X509
-
- class _ctxmap:
- singleton = None
-
- def __init__(self):
- self.map = WeakValueDictionary()
-
-
- def __getitem__(self, key):
- return self.map[key]
-
-
- def __setitem__(self, key, value):
- self.map[key] = value
-
-
- def __delitem__(self, key):
- del self.map[key]
-
-
-
- def map():
- if _ctxmap.singleton is None:
- _ctxmap.singleton = _ctxmap()
-
- return _ctxmap.singleton
-
-
- class Context:
- m2_ssl_ctx_free = m2.ssl_ctx_free
-
- def __init__(self, protocol = 'sslv23', weak_crypto = None):
- proto = getattr(m2, protocol + '_method', None)
- if proto is None:
- raise ValueError, "no such protocol '%s'" % protocol
-
- self.ctx = m2.ssl_ctx_new(proto())
- self.allow_unknown_ca = 0
- map()[long(self.ctx)] = self
- m2.ssl_ctx_set_cache_size(self.ctx, 0x80L)
- if weak_crypto is None:
- if protocol == 'sslv23':
- self.set_options(m2.SSL_OP_ALL | m2.SSL_OP_NO_SSLv2)
-
- self.set_cipher_list('ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH')
-
-
-
- def __del__(self):
- if getattr(self, 'ctx', None):
- self.m2_ssl_ctx_free(self.ctx)
-
-
-
- def close(self):
- del map()[long(self.ctx)]
-
-
- def load_cert(self, certfile, keyfile = None, callback = util.passphrase_callback):
- m2.ssl_ctx_passphrase_callback(self.ctx, callback)
- m2.ssl_ctx_use_cert(self.ctx, certfile)
- if not keyfile:
- keyfile = certfile
-
- m2.ssl_ctx_use_privkey(self.ctx, keyfile)
- if not m2.ssl_ctx_check_privkey(self.ctx):
- raise ValueError, 'public/private key mismatch'
-
-
-
- def load_cert_chain(self, certchainfile, keyfile = None, callback = util.passphrase_callback):
- m2.ssl_ctx_passphrase_callback(self.ctx, callback)
- m2.ssl_ctx_use_cert_chain(self.ctx, certchainfile)
- if not keyfile:
- keyfile = certchainfile
-
- m2.ssl_ctx_use_privkey(self.ctx, keyfile)
- if not m2.ssl_ctx_check_privkey(self.ctx):
- raise ValueError, 'public/private key mismatch'
-
-
-
- def set_client_CA_list_from_file(self, cafile):
- m2.ssl_ctx_set_client_CA_list_from_file(self.ctx, cafile)
-
- load_client_CA = load_client_ca = set_client_CA_list_from_file
-
- def load_verify_locations(self, cafile = None, capath = None):
- if cafile is None and capath is None:
- raise ValueError('cafile and capath can not both be None.')
-
- return m2.ssl_ctx_load_verify_locations(self.ctx, cafile, capath)
-
- load_verify_info = load_verify_locations
-
- def set_session_id_ctx(self, id):
- ret = m2.ssl_ctx_set_session_id_context(self.ctx, id)
- if not ret:
- raise Err.SSLError(Err.get_error_code(), '')
-
-
-
- def set_allow_unknown_ca(self, ok):
- self.allow_unknown_ca = ok
-
-
- def get_allow_unknown_ca(self):
- return self.allow_unknown_ca
-
-
- def set_verify(self, mode, depth, callback = None):
- if callback is None:
- m2.ssl_ctx_set_verify_default(self.ctx, mode)
- else:
- m2.ssl_ctx_set_verify(self.ctx, mode, callback)
- m2.ssl_ctx_set_verify_depth(self.ctx, depth)
-
-
- def get_verify_mode(self):
- return m2.ssl_ctx_get_verify_mode(self.ctx)
-
-
- def get_verify_depth(self):
- return m2.ssl_ctx_get_verify_depth(self.ctx)
-
-
- def set_tmp_dh(self, dhpfile):
- f = BIO.openfile(dhpfile)
- dhp = m2.dh_read_parameters(f.bio_ptr())
- return m2.ssl_ctx_set_tmp_dh(self.ctx, dhp)
-
-
- def set_tmp_dh_callback(self, callback = None):
- if callback is not None:
- m2.ssl_ctx_set_tmp_dh_callback(self.ctx, callback)
-
-
-
- def set_tmp_rsa(self, rsa):
- if isinstance(rsa, RSA.RSA):
- return m2.ssl_ctx_set_tmp_rsa(self.ctx, rsa.rsa)
- else:
- raise TypeError, 'Expected an instance of RSA.RSA, got %s.' % (rsa,)
-
-
- def set_tmp_rsa_callback(self, callback = None):
- if callback is not None:
- m2.ssl_ctx_set_tmp_rsa_callback(self.ctx, callback)
-
-
-
- def set_info_callback(self, callback = cb.ssl_info_callback):
- m2.ssl_ctx_set_info_callback(self.ctx, callback)
-
-
- def set_cipher_list(self, cipher_list):
- return m2.ssl_ctx_set_cipher_list(self.ctx, cipher_list)
-
-
- def add_session(self, session):
- return m2.ssl_ctx_add_session(self.ctx, session._ptr())
-
-
- def remove_session(self, session):
- return m2.ssl_ctx_remove_session(self.ctx, session._ptr())
-
-
- def get_session_timeout(self):
- return m2.ssl_ctx_get_session_timeout(self.ctx)
-
-
- def set_session_timeout(self, timeout):
- return m2.ssl_ctx_set_session_timeout(self.ctx, timeout)
-
-
- def set_session_cache_mode(self, mode):
- return m2.ssl_ctx_set_session_cache_mode(self.ctx, mode)
-
-
- def get_session_cache_mode(self):
- return m2.ssl_ctx_get_session_cache_mode(self.ctx)
-
-
- def set_options(self, op):
- return m2.ssl_ctx_set_options(self.ctx, op)
-
-
- def get_cert_store(self):
- return X509.X509_Store(m2.ssl_ctx_get_cert_store(self.ctx))
-
-
-